/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package studio.raptor.ddal.core.transaction;

import com.google.common.base.Splitter;
import com.google.common.io.Files;
import com.google.common.io.PatternFilenameFilter;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import studio.raptor.ddal.common.util.RuntimeUtil;

/**
 * 事务日志记录器
 *
 * @author Sam
 * @since 3.0.0
 */
class MainLineTransactionLogger extends AbstractTransactionLogger {

  private static final String TX_LOG_NAME = "ddal.transaction";
  private static MainLineTransactionLogger delegate = new MainLineTransactionLogger();
  private Logger txLogger;

  private MainLineTransactionLogger() {
    super(RuntimeUtil.getTransactionRecordLog(), TX_LOG_NAME,
            DOT_JOINER.join(TX_LOG_NAME, LOG_SUFFIX),
            DOT_JOINER.join(TX_LOG_NAME, ARCHIVE_LAYOUT_PATTERN, LOG_SUFFIX),
            System.getProperty("ddal.tx.log.rollingsize"));
    super.registerLogConfig();
    txLogger = LogManager.getLogger(TX_LOG_NAME);
    startupArchiveTask();
  }

  static MainLineTransactionLogger instance() {
    return delegate;
  }

  @Override
  public void log(String message) {
    delegate.txLogger.log(Level.INFO, message);
  }

  private void startupArchiveTask() {
    Thread thread = new Thread(new FinishedTxArchiveTask(RuntimeUtil.getTransactionRecordLog()));
    thread.setDaemon(true);
    thread.setName("TransactionLogArchiveTask");
    thread.start();
  }

  /**
   * 日志文件格式：
   * 20170302104301646|3.0.0.1000|1|school|170302104257486-8|UPDATE ddal_test_0.student SET sphone = '18052029122' WHERE sno = 200901020104|
   * 20170302104301647|170302104257486-8|COMMITTED
   */
  static class FinishedTxArchiveTask implements Runnable {
    private static org.slf4j.Logger logger = LoggerFactory.getLogger(FinishedTxArchiveTask.class);
    private final Splitter lineSplitter = Splitter.on('|');
    private Map<String, TxFileStatus> filesOnProcess = new HashMap<>(); // 正在处理文件列表
    private Set<String> committedTransactionIdList = new HashSet<>();
    private final String TX_LOG_PATH;

    private FinishedTxArchiveTask(String transactionLogPath) {
      this.TX_LOG_PATH = transactionLogPath;
    }

    @Override
    public void run() {
      while (true) {
        String[] txFiles = listTxFiles();
        if (committedTransactionIdList.size() > 0) {
          logger.info("Transaction id bucket size: {}", committedTransactionIdList.size());
        }
        if (null != txFiles && txFiles.length > 0) {
          logger.info("Transaction rolling files under path [{}] have been found, files: {}", TX_LOG_PATH, Arrays.toString(txFiles));
          for (String txFile : txFiles) {
            MappedByteBuffer mbb;
            try {
              mbb = Files.map(new File(TX_LOG_PATH + File.separator + txFile), FileChannel.MapMode.READ_ONLY);
            } catch (IOException ioe) {
              logger.error(String.format("Read transaction log file failed, log file is %s", txFile), ioe);
              continue;
            }
            if (!filesOnProcess.keySet().contains(txFile)) {
              loadCommittedTxIdIntoMemory(mbb, txFile);
              filesOnProcess.put(txFile, new TxFileStatus(true));
            }
            checkTxAllCommitted(mbb, txFile);
            if (filesOnProcess.get(txFile).isAllCommit()) {
              archiveTxLogFile(txFile);
              deleteCommittedTxIdFromMemory(mbb, txFile);
            }
          }
        }
        try {
          Thread.sleep(10000);
        } catch (InterruptedException ignore) {
        }
      }
    }

    private void deleteCommittedTxIdFromMemory(MappedByteBuffer mbb, String fileName) {
      readFileAndDo(mbb, fileName, new LineProcessor() {
        private String name;

        @Override
        public LineProcessor withName(String name) {
          this.name = name;
          return this;
        }

        @Override
        public String getName() {
          return name;
        }

        @Override
        public void process(String line) {
          List<String> txfLineSplits = lineSplitter.splitToList(line);
          if (txfLineSplits.size() > 3) {
            committedTransactionIdList.remove(txfLineSplits.get(4));
          }
        }
      }.withName("DeleteCommittedTxIdFromMemory"));
    }

    /**
     * 将日志文件中已提交的事务ID加载到内存中。
     * DDAL事务日志提交日志的格式：
     * 20170302104300450|170302104257486-5|COMMITTED
     *
     * @param mbb  日志文件在内存中的镜像
     * @param fileName 日志文件名
     */
    private void loadCommittedTxIdIntoMemory(MappedByteBuffer mbb, String fileName) {
      readFileAndDo(mbb, fileName, new LineProcessor() {
        private String name;

        @Override
        public LineProcessor withName(String name) {
          this.name = name;
          return this;
        }

        @Override
        public String getName() {
          return this.name;
        }

        @Override
        public void process(String line) {
          List<String> txfLineSplits = lineSplitter.splitToList(line);
          if (txfLineSplits.size() == 3) {
            committedTransactionIdList.add(txfLineSplits.get(1));
          }
        }
      }.withName("LoadTransactionIdIntoMemory"));
    }

    /**
     * 检查日志文件中事务是否都已提交。
     * 默认标记当前日志文件中的日志为已全部提交，遍历时发现有未提交事务时
     * 就抛异常，退出当前文件的遍历。
     *
     * @param mbb 文件内存镜像
     * @param fileName 文件名
     */
    private void checkTxAllCommitted(MappedByteBuffer mbb, String fileName) {
      try {
        readFileAndDo(mbb, fileName, new LineProcessor() {
          String name;

          @Override
          public LineProcessor withName(String name) {
            this.name = name;
            return this;
          }

          @Override
          public String getName() {
            return this.name;
          }

          @Override
          public void process(String line) {
            List<String> txfLineSplits = lineSplitter.splitToList(line);
            if (txfLineSplits.size() > 3) {
              if (!committedTransactionIdList.contains(txfLineSplits.get(4))) {
                throw new RuntimeException("uncommitted transaction found, " + txfLineSplits.get(4));
              }
            }
          }
        }.withName("CheckTransactionsInFileIsAllCommitted"));
      } catch (RuntimeException re) {
        filesOnProcess.get(fileName).hasUncommittedTx();
      }
    }

    private void readFileAndDo(MappedByteBuffer mbb, String fileName, LineProcessor lineProcessor) {
      ByteArrayOutputStream lineStream = null;
      mbb.position(0);// read file from start
      try {
        lineStream = new ByteArrayOutputStream();
        while (mbb.hasRemaining()) {
          byte b = mbb.get();
          if (b == '\n' && lineStream.size() > 0) {
            lineProcessor.process(lineStream.toString("UTF-8"));
            lineStream.reset();
          } else {
            lineStream.write(b);
          }
        }
      } catch (Exception e) {
        logger.error(String.format("Read transaction log file [%s] failed with process of [%s]",
                fileName, lineProcessor.getName()), e);
      } finally {
        try {
          if (null != lineStream) {
            lineStream.close();
          }
        } catch (IOException ignore) {
        }
      }
    }

    interface LineProcessor {
      LineProcessor withName(String name);

      String getName();

      void process(String line);
    }

    static class TxFileStatus {
      private boolean allCommit;

      TxFileStatus(boolean isAllCommit) {
        this.allCommit = isAllCommit;
      }

      boolean isAllCommit() {
        return allCommit;
      }

      void hasUncommittedTx() {
        this.allCommit = false;
      }
    }

    private void archiveTxLogFile(String fileName) {
      try {
        Files.move(new File(TX_LOG_PATH + File.separator + fileName), new File(TX_LOG_PATH + File.separator + fileName + ".archived"));
        filesOnProcess.remove(fileName);
        logger.info("Transaction log file {} has been archived", fileName);
      } catch (IOException ioe) {
        logger.error(String.format("Archive log file failed, log file name is [%s]", fileName), ioe);
      }
    }

    private String[] listTxFiles() {
      File txFilePath = new File(TX_LOG_PATH);
      if (!txFilePath.exists() || !txFilePath.isDirectory()) {
        throw new RuntimeException(String.format("[%s] is not a valid transaction log path", TX_LOG_PATH));
      }
      return txFilePath.list(new PatternFilenameFilter(".*ddal.transaction.*\\.\\d{1,}\\.log"));
    }
  }
}
